package com.ycl.election;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.util.concurrent.ConcurrentHashMap;

/**
 * 服务器节点
 * 每个都会参与选举
 */
public class ElectionNode {
    private int nodeId; // 当前节点ID
    private long zxId;  // 当前节点的ZXID
    private volatile int leaderId; // 当前选举的Leader ID
    private String host;
    private int port;
    private ConcurrentHashMap<Integer, Integer> voteMap = new ConcurrentHashMap<>(); // 此节点对每个节点的投票情况
    private int totalNodes; // 集群总节点数

    public ElectionNode(int nodeId, long zxId, String host, int port, int totalNodes) {
        this.nodeId = nodeId;
        this.zxId = zxId;
        this.host = host;
        this.port = port;
        this.totalNodes = totalNodes;
    }

    /**
     * 启动服务
     *
     * @throws Exception
     */
    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ObjectEncoder(),
                                    new ElectionHandler(ElectionNode.this));
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("Node " + nodeId + " started on port " + port);

            // 启动后开始选举过程
            startElection();
//            future.channel().closeFuture().sync();


        } catch (Exception e) {

        } finally {
//            bossGroup.shutdownGracefully();
//            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 开始选举，先投自己一票
     *
     * @throws InterruptedException
     */
    private void startElection() throws InterruptedException {
        voteMap.merge(nodeId, 1, Integer::sum);

        // 向其他节点请求投票
        for (int i = 1; i <= totalNodes; i++) {
            if (i != nodeId) {
                sendVoteRequest(host, 9000 + i);
            }
        }
    }

    /**
     * 向其余节点发送投票请求
     *
     * @param targetHost
     * @param targetPort
     */
    public void sendVoteRequest(String targetHost, int targetPort) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ObjectEncoder(),
                                    new ElectionHandler(ElectionNode.this));
                        }
                    });

            ChannelFuture future = bootstrap.connect(targetHost, targetPort).sync();
            ElectionMessage voteRequest = new ElectionMessage(ElectionMessage.MessageType.VOTE_REQUEST, nodeId, zxId, nodeId);
            future.channel().writeAndFlush(voteRequest);
//            future.channel().closeFuture().sync();
        } catch (Exception e) {

        } finally {
//            group.shutdownGracefully();
        }
    }

    /**
     * 接受投票
     *
     * @param nodeId
     */
    public void receiveVote(int nodeId) {
        voteMap.merge(nodeId, 1, Integer::sum);
        // 比较出votes里值，取出最大的那个对应的key
        int currentVotes = voteMap.values().stream().max(Integer::compareTo).get();

        if (currentVotes > totalNodes / 2 && leaderId == 0) {
            setLeaderId(nodeId);
            broadcastElected();
        }
    }

    /**
     * 广播选举结果
     */
    private void broadcastElected() {
        for (int i = 1; i <= totalNodes; i++) {
            if (i != nodeId) {
                sendElectedMessage(host, 9000 + i);
            }
        }
    }

    /**
     * 发送选举结果
     *
     * @param targetHost
     * @param targetPort
     */
    public void sendElectedMessage(String targetHost, int targetPort) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ObjectEncoder(),
                                    new ElectionHandler(ElectionNode.this));
                        }
                    });

            ChannelFuture future = bootstrap.connect(targetHost, targetPort).sync();
            ElectionMessage electedMessage = new ElectionMessage(ElectionMessage.MessageType.ELECTED, leaderId, zxId, leaderId);
            future.channel().writeAndFlush(electedMessage);
//            future.channel().closeFuture().sync();
        } catch (Exception e) {

        } finally {
//            group.shutdownGracefully();
        }
    }

    /**
     * 设置leader
     *
     * @param leaderId
     */
    public void setLeaderId(int leaderId) {
        this.leaderId = leaderId;
        System.out.println("Node " + nodeId + " sees Node " + leaderId + " as leader.");
    }

    public int getNodeId() {
        return nodeId;
    }

    public long getZxId() {
        return zxId;
    }

    public int getLeaderId() {
        return leaderId;
    }
}
